/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package cn.ac.iie.dpl.data.mmloader.worker;

import cn.ac.iie.dpl.data.mmloader.globalParas.GlobalParas;
import cn.ac.iie.dpl.data.mmloader.mq.RocketProducer;
import iie.mm.client.ClientAPI;
import java.io.ByteArrayOutputStream;
import org.apache.log4j.Logger;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.Utf8;

/**
 *
 * @author Li Mingyang
 */
public class MMLoader implements Runnable {

    public static Logger logger = Logger.getLogger(MMLoader.class.getName());

    public Schema outSchema = null;
    public Set<String> outFieldNames = new HashSet();
    private ClientAPI client = null;
    private String srcSchemaName = null;
    private SimpleDateFormat secondFormat = new SimpleDateFormat("yyyy-MM-dd HH");
    private LinkedBlockingQueue<GenericRecord> inBuf;
    private long bg = System.currentTimeMillis();
    private long ed = System.currentTimeMillis();

    public void init(String srcSchemaName, LinkedBlockingQueue<GenericRecord> b, ClientAPI mmclient) {
        this.srcSchemaName = srcSchemaName;
        this.client = mmclient;
        this.inBuf = b;
    }

    @Override
    public void run() {
        while (true) {
            this.mmloaderAndSend();
        }
    }

    private void mmloaderAndSend() {
        GenericRecord singleRecord = null;
        ByteBuffer bytesdata = null;
        String timekey = "";
        String mmKey = "";
        Date d = new Date();
        Long timeInSeconds = 0l;
        while (true) {
            OK:
            try {
                DatumWriter<GenericRecord> docsWriter = new GenericDatumWriter<GenericRecord>(GlobalParas.dpldocsSchema);
                ByteArrayOutputStream docssos = new ByteArrayOutputStream();
                BinaryEncoder docsbe = new EncoderFactory().binaryEncoder(docssos, null);
                GenericRecord docsRecord = new GenericData.Record(GlobalParas.dpldocsSchema);
                docsRecord.put("doc_schema_name", GlobalParas.srcDesScheMap.get(srcSchemaName));
                GenericArray docSet = new GenericData.Array<ByteBuffer>(GlobalParas.batchSize, GlobalParas.dpldocsSchema.getField("doc_set").schema());
                DatumWriter<GenericRecord> grWriter = new GenericDatumWriter<GenericRecord>(GlobalParas.dpldocSchema);
                ByteArrayOutputStream grsos = new ByteArrayOutputStream();
                BinaryEncoder grSingleDocEncoder = new EncoderFactory().binaryEncoder(grsos, null);
                Map<String, String> userDesc = new HashMap<String, String>();
                int tmp=0;
                for (int i = 0; i < GlobalParas.batchSize; i++) {
                    try {
                        singleRecord = (GenericRecord) this.inBuf.poll(10, TimeUnit.SECONDS);
                    } catch (Exception ex) {
                        logger.error("split the one data from for " + srcSchemaName + " error: " + ex, ex);
                        continue;
                    }
                    if(singleRecord==null){
                        tmp++;
                        if(tmp==GlobalParas.batchSize){
                            break OK;
                        }
                    continue;
                    }
                    GenericRecord singleOutRecord = new GenericData.Record(GlobalParas.dpldocSchema);
                    Map<Utf8, Long> timeKeyvals = (Map<Utf8, Long>) singleRecord.get("long_s");
                    if (timeKeyvals != null && !timeKeyvals.isEmpty()) {
                        singleOutRecord.put("long_s", timeKeyvals);
                    }
                    if (timeKeyvals == null) {
                        logger.error("long :" + GlobalParas.srcTimeColumnNameMap.get(srcSchemaName) + " of table " + srcSchemaName + " value is  error,the entire record is " + singleRecord);
                    }
                    timeInSeconds = timeKeyvals.get(new Utf8(GlobalParas.srcTimeColumnNameMap.get(srcSchemaName)));
                    if (timeInSeconds == null) {
                        logger.error("long :" + GlobalParas.srcTimeColumnNameMap.get(srcSchemaName) + " of table " + srcSchemaName + " value is  error,the entire record is " + singleRecord);
                    }
                    Map<Utf8, Object> mmVals = (Map<Utf8, Object>) singleRecord.get("bytes_s");
                    if (mmVals == null) {
                        logger.error("bytes of table " + srcSchemaName + " value is  error,the entire record is " + singleRecord);
                    }
                    List<String> names = GlobalParas.srcSchemaColumnNameMap.get(srcSchemaName);
                    d.setTime(timeInSeconds * 1000L);
                    timekey = this.secondFormat.format(d);
                    try {
                        timekey = new StringBuilder().append(this.secondFormat.parse(timekey).getTime() / 1000L).append("").toString();
                    } catch (ParseException ex) {
                        logger.error(ex, ex);
                    }
                    //入多媒体的时候就把这几个字段弄完了
                    HashMap<String, Object> bytesMapSet = new HashMap<String, Object>();
                    for (String name : names) {
                        mmKey = new StringBuilder().append(timekey).append("@").append(UUID.randomUUID().toString()).toString();
                        logger.info("mmKey==="+mmKey);
                        bytesdata = (ByteBuffer) mmVals.get(new Utf8(name));
                        this.mm(bytesdata, mmKey);
                        bytesMapSet.put(name, ByteBuffer.wrap(mmKey.getBytes("UTF-8")));
                    }
                    if (bytesMapSet != null && !bytesMapSet.isEmpty()) {
                        singleOutRecord.put("bytes_s", bytesMapSet);
                    }
                    Map<String, String> strings = (Map<String, String>) singleRecord.get("string_s");
                    if (strings != null && !strings.isEmpty()) {
                        singleOutRecord.put("string_s", strings);
                    }
                    Map<Utf8, Integer> ints = (Map<Utf8, Integer>) singleRecord.get("int_s");
                    if (ints != null && !ints.isEmpty()) {
                        singleOutRecord.put("int_s", ints);
                    }
                    Map<Utf8, Float> floats = (Map<Utf8, Float>) singleRecord.get("float_s");
                    if (floats != null && !floats.isEmpty()) {
                        singleOutRecord.put("float_s", floats);
                    }
                    Map<Utf8, Double> doubles = (Map<Utf8, Double>) singleRecord.get("double_s");
                    if (doubles != null && !doubles.isEmpty()) {
                        singleOutRecord.put("double_s", doubles);
                    }
                    Map<Utf8, Boolean> booleans = (Map<Utf8, Boolean>) singleRecord.get("boolean_s");
                    if (booleans != null && !booleans.isEmpty()) {
                        singleOutRecord.put("booleans_s", booleans);
                    }
                    //都搞完了
                    grWriter.write(singleOutRecord, grSingleDocEncoder);
                    grSingleDocEncoder.flush();
                    docSet.add(ByteBuffer.wrap(grsos.toByteArray()));
                    grsos.reset();
                }
                docsRecord.put("doc_set", docSet);
                docsRecord.put("doc_desc", GlobalParas.srcToDesDocDesc.get(srcSchemaName));
                docsRecord.put("user_desc", userDesc);
                docsWriter.write(docsRecord, docsbe);
                docsbe.flush();
                RocketProducer.sendMessage(GlobalParas.srcDesTopicMap.get(srcSchemaName), docssos.toByteArray(), docSet.size());
            } catch (Exception ex) {
                logger.error(ex,ex);
            }
        }
    }

    private void mm(ByteBuffer bytesdata, String mmKey) {
        if ((bytesdata == null) || (bytesdata.array().length == 0) || (mmKey == null)) {
            if (bytesdata == null) {
                logger.info(new StringBuilder().append("binary data is null,multimedia data key is ").append(mmKey).toString());
            } else {
                logger.info(new StringBuilder().append("binary data size is ").append(bytesdata.array().length).append(",multimedia data key is ").append(mmKey).toString());
            }
        } else {
            while (true) {
                try {
                    bg = System.currentTimeMillis();
                    (this.client).put(mmKey, bytesdata.array());
                    ed = System.currentTimeMillis();
                    logger.info("put one multimedia data to mm db use " + (ed - bg) + " ms,size is " + bytesdata.array().length);
                    break;
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error(e, e);
                    try {
                        Thread.sleep(20L);
                    } catch (Exception ex) {
                    }
                    continue;
                }
            }
        }
    }
}
